package com.zhanghe;

import java.util.Comparator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * Timer
 *
 * @author Clevo
 * @date 2018/4/25 22:23
 */
public class PriorityQueueTimer implements Runnable {

	private Logger logger = LoggerFactory.getLogger(PriorityQueueTimer.class);

	private final ReadWriteLock lock = new ReentrantReadWriteLock();
	private final Lock r = lock.readLock();
	private final Lock w = lock.writeLock();
	//public volatile boolean RUNNING_STATUS = true;
	
	public Statistics statistics;
	/**
	 * 桶数量
	 */
	public volatile int barrel_size = 0;
	/**
	 * 桶数组
	 */
	public PriorityQueueBarrel[] mession_barrels =  null;
	/**
	 * 当前所在桶
	 */
	public volatile int current_barrel = 0;
	
	private ExecutorService exec;
	
	public AtomicLong mission_total_num = new AtomicLong(0);
	
	private long excute_mission_num = 0;
	
	public PriorityQueueTimer(int barrel_size ){
		super();
		this.barrel_size = barrel_size;
		this.current_barrel = 0;
		this.mession_barrels = new PriorityQueueBarrel[barrel_size];
		for(int i = 0;i<barrel_size;i++){
			Comparator<Mission> comparator = new Comparator<Mission>() {
				public int compare(Mission o1, Mission o2) {
					if(o1.getRepeattime()==o2.getRepeattime()){
						return 0;
					}else{
						return o1.getRepeattime()>o2.getRepeattime() ? 1 : -1;
					}
				}
			};
			PriorityQueueBarrel barrel = new PriorityQueueBarrel(new PriorityBlockingQueue<Mission>(10,comparator));
			this.mession_barrels[i] = barrel;
		}
		this.exec = Executors.newFixedThreadPool(5);
		this.statistics = new Statistics();
	}
	
	public PriorityQueueTimer(int barrel_size , int threadNum ){
		super();
		this.barrel_size = barrel_size;
		this.current_barrel = 0;
		this.mession_barrels = new PriorityQueueBarrel[barrel_size];
		for(int i = 0;i<barrel_size;i++){
			Comparator<Mission> comparator = new Comparator<Mission>() {
				public int compare(Mission o1, Mission o2) {
					if(o1.getRepeattime()==o2.getRepeattime()){
						return 0;
					}else{
						return o1.getRepeattime()>o2.getRepeattime() ? 1 : -1;
					}
				}
			};
			PriorityQueueBarrel barrel = new PriorityQueueBarrel(new PriorityBlockingQueue<Mission>(10,comparator));
			this.mession_barrels[i] = barrel;
		}
		this.exec = Executors.newFixedThreadPool(5);
		this.statistics = new Statistics();
	}
	
	public PriorityQueueTimer(int barrel_size , int current_barrel, int threadNum ){
		super();
		this.barrel_size = barrel_size;
		this.current_barrel = current_barrel;
		this.mession_barrels = new PriorityQueueBarrel[barrel_size];
		for(int i = 0;i<barrel_size;i++){
			Comparator<Mission> comparator = new Comparator<Mission>() {
				public int compare(Mission o1, Mission o2) {
					if(o1.getRepeattime()==o2.getRepeattime()){
						return 0;
					}else{
						return o1.getRepeattime()>o2.getRepeattime() ? 1 : -1;
					}
				}
			};
			PriorityQueueBarrel barrel = new PriorityQueueBarrel(new PriorityBlockingQueue<Mission>(10,comparator));
			this.mession_barrels[i] = barrel;
		}
		this.exec = Executors.newFixedThreadPool(threadNum);
		this.statistics = new Statistics();
	}
	/**
	 * 添加任务
	 * @param timeout
	 * @param mission
	 * @return
	 * @author Clevo
	 * @date 2018/4/25 22:56
	*/
	public void addmession(long timeout,RunnableMission mission){
		//mission_total_num.getAndAdd(1);
		statistics.addMission();
		//根据桶数量计算需要在当前桶之后哪个桶添加任务
		long stepout = timeout % barrel_size;
		int current = -1;
		r.lock();
		try{
			current = current_barrel;
		}finally {
			r.unlock();
		}
		int target_barrel_index = (int) ((current+stepout)%barrel_size);
		PriorityQueueBarrel targetbarrel = mession_barrels[target_barrel_index];
		//根据桶的重复次数计算repeattime
		long repeattime = targetbarrel.getRepeat_time()+ 1 +(timeout/barrel_size);
		mission.setStatistics(statistics);
		Mission missionobj = new Mission(mission, repeattime);
		//System.out.println("任务添加到第"+target_barrel_index+"个桶,stepout="+stepout+",repeattime="+repeattime);
		targetbarrel.getMissions().offer(missionobj);
	}
	@Override
	public void run() {
		long start_time = 0;
		long end_time = 0;
		long last_excute_time = 0;
		try {
			while(!Thread.currentThread().isInterrupted()){
				long current_time = System.currentTimeMillis();
				if((current_time-last_excute_time)<1000){
					continue;
				}
				last_excute_time = current_time;
				int current = -1;
				PriorityQueueBarrel currentbarrel = null;
				start_time = System.currentTimeMillis();
				w.lock();
				try{
					//一开始就将current_barrel加1 以免
					current = current_barrel;
					//获取当前所处理的桶
					currentbarrel = mession_barrels[current];
					currentbarrel.setRepeat_time(currentbarrel.getRepeat_time()+1);
					//处理下一个桶
					current_barrel++;
					if(current_barrel>=barrel_size){
						current_barrel = 0;
					}
				}finally {
					w.unlock();
				}
				end_time = System.currentTimeMillis();
				System.out.println("获取锁消耗时间:"+(end_time-start_time));
				logger.debug(" step into this barrel : {},barrel_repeat_time:{}",current,currentbarrel.getRepeat_time());
				System.out.println("step into this barrel:"+current+",barrel_repeat_time:"+currentbarrel.getRepeat_time());
				long bianli_start_time = System.currentTimeMillis();
				if(currentbarrel!=null){
					if(currentbarrel.getMissions()!=null){
						int size = currentbarrel.getMissions().size();
						int i = 0;
						System.out.println("桶任务数:"+size);
						//循环每一个任务 查看是否到达执行时间
						while(i<size){
							//从队列里取出第一个任务
							Mission mission = currentbarrel.getMissions().poll();
							if(mission.getRepeattime()>currentbarrel.getRepeat_time()){
								//放回任务
								currentbarrel.getMissions().offer(mission);
								break;
							}
							if(mission.getRepeattime() == currentbarrel.getRepeat_time()){
								//如果该任务repeattime=0 说明已经到了执行时间 执行任务
								exec.execute(mission.getMission());
								//new Thread(mission.getMission()).start();
								excute_mission_num++;
							}else{
								//如果该任务repeattime不等于currentbarrel.getRepeat_time() 说明任务已经被遗漏

							}
							i++;
						}
					}else{
						logger.debug(" this barrel has no missions: {}",current);
						System.out.println("this barrel has no missions:"+current);
					}

				}else{
					logger.debug(" this barrel is null: {}",current);
					System.out.println("this barrel is null:"+current);
				}
				long bianli_end_time = System.currentTimeMillis();
				System.out.println("遍历桶 消耗时间:"+(bianli_end_time-bianli_start_time));
				statistics.loop(bianli_end_time-bianli_start_time);
				//每一秒处理一个桶
				System.out.println("处理完成 时间:"+System.currentTimeMillis());
				Thread.sleep(10);
			}
		}catch(InterruptedException e){
			exec.shutdown();
			System.out.println("Timer Interrupted");
			System.out.println("添加任务总数:"+statistics.total_add_num.longValue()+",执行任务总数:"+statistics.total_excute_num.longValue()+",平均延迟:"
		            +statistics.avg_time()+",超时率:"+statistics.timeout_rate());
			logger.debug("Timer Interrupted");
		}
	}
}
